/* Copyright (c) 2022-2022, LiWangQian<liwangqian@huawei.com> All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this list of
 *    conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
 *    of conditions and the following disclaimer in the documentation and/or other materials
 *    provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors may be used
 *    to endorse or promote products derived from this software without specific prior written
 *    permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
#include "libflexe/infra/event/broker.h"
#include "libflexe/infra/event/listener.h"
#include "libflexe/infra/event/topic.h"
#include "libflexe/infra/enumulator.h"
#include "libflexe/infra/error.h"

namespace libflexe::infra::event {

template <typename Res>
static inline std::future<Res> futurelize(Res val)
{
    std::promise<Res> p{std::allocator_arg, allocator<Res>{}};
    p.set_value(val);
    return p.get_future();
}

template <typename F>
class annoy_listener : public listener {
public:
    annoy_listener(F &&fun)
        : listener(annoy_name())
        , f{fun ? std::forward<F>(fun) : null_listener}
    {
    }

    event_result on_event(ievent &evt) noexcept override
    {
        return f(evt);
    }

private:
    static event_result null_listener(ievent &)
    {
        return event_result::skip;
    }

    static stl::string annoy_name()
    {
        static size_t idx = 0;
        return stl::string("annoy_listener#" + std::to_string(idx++));
    }

    F f;
};

broker::broker(stl::string name)
    : name_{std::move(name)}
{
}

int broker::setup()
{
    stl::string name{"broker-event"};
    auto broker_topic = stl::make_shared<topic>(name);
    if (broker_topic != nullptr && broker_topic->setup() == OK) {
        auto res = topics_.try_emplace(name, std::move(broker_topic));
        if (res.second) {
            return OK;
        }
    }
    return E_NULL_PTR;
}

const stl::string &broker::name() const noexcept
{
    return name_;
}

int broker::new_topic(const stl::string & topic_name)
{
    auto new_topic = stl::make_shared<topic>(topic_name);
    if (new_topic == nullptr) {
        return E_OUT_MEMORY;
    }

    auto res = new_topic->setup();
    if (res != OK) {
        return res;
    }

    bool inserted = false;
    {
        std::lock_guard<std::mutex> lock{mtx_};
        auto ret = topics_.try_emplace(topic_name, std::move(new_topic));
        inserted = ret.second;
    }

    if (inserted) {
        (void)pub_to_topic("broker-event",
            stl::make_shared<new_topic_event>(this->name(), topic_name));
        return OK;
    }
    return E_EXISTED;
}

int broker::del_topic(const stl::string &name)
{
    std::lock_guard<std::mutex> lock{mtx_};
    auto iter = topics_.find(name);
    if (iter != std::end(topics_)) {
        (void)topics_.erase(iter);
        return OK;
    }
    return E_NOT_EXIST;
}

int broker::sub_from_topic(const stl::string &topic_name,
    const stl::string &listener_name, stl::shared_ptr<listener> l)
{
    if (l == nullptr) {
        return E_NULL_PTR;
    }

    auto topic = get_topic(topic_name);
    return topic ? topic->sub(listener_name, std::move(l)) : E_NOT_EXIST;
}

stl::string broker::sub_from_topic(const stl::string &topic_name,
    std::function<event_result(ievent&)> f)
{
    if (f == nullptr) {
        return "";
    }

    using listener_type = annoy_listener<
        std::function<event_result(ievent&)>>;
    auto l = stl::make_shared<listener_type>(std::move(f));
    if (l == nullptr) {
        return "";
    }

    auto topic = get_topic(topic_name);
    return topic && (topic->sub(l->name(), l) == OK) ? l->name() : "";
}

void broker::unsub_from_topic(const stl::string &topic_name,
    const stl::string &listener_name)
{
    auto topic = get_topic(topic_name);
    (topic ? topic->unsub(listener_name) : (void)0);
}

int broker::pub_to_topic(const stl::string &topic_name,
    stl::shared_ptr<ievent> evt)
{
    if (evt == nullptr) {
        return E_NULL_PTR;
    }

    auto topic = get_topic(topic_name);
    return topic ? topic->pub(std::move(evt)) : E_NOT_EXIST;
}

int broker::pub_to_topic_async(
    const stl::string &topic_name, stl::shared_ptr<ievent> evt)
{
    if (evt == nullptr) {
        return E_NULL_PTR;
    }

    auto topic = get_topic(topic_name);
    return topic ? topic->pub_async(std::move(evt)) : E_NOT_EXIST;
}

bool broker::has_topic(const stl::string & topic_name) const noexcept
{
    std::lock_guard<std::mutex> lock{mtx_};
    return topics_.find(topic_name) != topics_.end();
}

stl::forward_list<stl::string> broker::list_topics() const
{
    stl::forward_list<stl::string> list;
    
    {
        std::lock_guard<std::mutex> lock{mtx_};
        for (auto &[name, _] : topics_) {
            list.push_front(name);
        }
    }

    return list;
}

broker::topic_ptr broker::get_topic(const stl::string &name)
{
    std::lock_guard<std::mutex> lock{mtx_};
    auto iter = topics_.find(name);
    if (iter == topics_.end()) {
        return nullptr;
    }
    return iter->second;
}

}
